[AWS IoT Core] ルールのエラーアクションでKinesis Data Streamsのシャードから溢れたデータを再投入してみました
1 はじめに
CX事業本部の平内(SIN)です。
AWS IoT のルールで、Amazon Kinesis Data Streams(以下、Kinesis Data Streams)をアクションとして指定した場合、シャードの入力制限を超えたデータは、ロストすることになります。
今回は、シャードの制限を超えて溢れてしまったデータを、ルールのエラーアクションで指定したLambdaによって、再び、IoT CoreへPublishする事で、ロストをリカバリする要領を確認してみました。
溢れたデータを再投入する仕組みを作ったとしても、常時、入力制限を超えてしまっている様な場面では、対応できません。あくまでも、想定していたデータ量が一時的に超えてしまった場合のリカバリーというイメージです。
ここで紹介する仕組みは、ルールのアクションが、常にエラーになったりすると無限ループとなってしまうため、Payloadにマークを入れるなどして、想定外のループになってしまわないようにする注意が必要です。
2 正常な状態
最初に、シャードの制限内で想定通り動作している状況を作ってみます。
処理するLambdaでは、集計のために、受信したRecord数をログ出力しています。
import time def lambda_handler(event, context): print({ "name": "load_test", "records_len": len(event["Records"]) })
確認作業のための、IoT CoreへのMQTTの送信は、下記のブログで作成したものを使用しました。
上記を調整し、秒間900件のデータを10秒間送信しています。
AWS IoT Coreのログを確認すると、綺麗に毎秒900件到着していることが確認できます。
また、Lambdaの方も、概ね毎秒900件が処理されています。
3 データのロスト
続いて、データがロストする状況を作為してみます。
Kinesis Data Streamsでは、シャードを1に設定しているので、秒間1,000件を超えると、制限オーバーとなるはずです。
上記と同じ様に、毎秒900件送信し、3秒目と4秒目だけ倍の1800件としてみました。(総数、10,800件)
data_size:128 CLIENT_ID_PREFIX:AX CLIENT_MAX:100 RPS:900 PERIOD:10 [sec] 0 2021-06-13 00:32:40.400338 counter: 900 elapsed_time: 0.018278837203979492 sec 1 2021-06-13 00:32:41.401362 counter: 900 elapsed_time: 0.016971588134765625 sec 2 2021-06-13 00:32:42.402381 counter: 900 elapsed_time: 0.01862025260925293 sec 3 2021-06-13 00:32:43.403397 counter: 1800 elapsed_time: 0.03636598587036133 sec 4 2021-06-13 00:32:44.404393 counter: 1800 elapsed_time: 0.04102015495300293 sec 5 2021-06-13 00:32:45.405385 counter: 900 elapsed_time: 0.016520261764526367 sec 6 2021-06-13 00:32:46.406401 counter: 900 elapsed_time: 0.01676487922668457 sec 7 2021-06-13 00:32:47.407414 counter: 900 elapsed_time: 0.016756534576416016 sec 8 2021-06-13 00:32:48.408428 counter: 900 elapsed_time: 0.016646146774291992 sec 9 2021-06-13 00:32:49.409441 counter: 900 elapsed_time: 0.016554594039916992 sec wait... finish!
また、ルールのエラーアクションに、Lambdaを設定しました。
def lambda_handler(event, context): print(event)
以下が、結果です。
正常に処理された場合Lambdaの処理数は、10,800件となるはずですが、結果は、全部で10,564件となっており、236件がロストしているようです。
また、設定したエラーアクションもコールされています。
エラーアクションの方を集計してみると、ロストされた236件と一致していました。
4 ロストのリカバリ
エラーアクションに指定したLambdaのeventの内容は以下の様になっています。
{ "ruleName": "load_test", "topic": "topic/load_test", "cloudwatchTraceId": "0d3d5279-9381-eaa0-66cf-2ec3b4cba326", "clientId": "AX_0045", "base64OriginalPayload": "eyJwcm9kdWNlcl90aW1lc3RhbXAiOiAiMjAyMS0wNi0xMiAyMjo1MDowNS4xNTgiLCAiY291bnRlciI6IDg0NSwgImRhdGEiOiAiWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFgifQ==", "failures": [ { "failedAction": "KinesisAction", "failedResource": "load_test_stream", "errorMessage": "Failed to publish Kinesis message. The error received was Rate exceeded for shard shardId-000000000000 in stream load_test_stream under account 439028474478. (Service: AmazonKinesis; Status Code: 400; Error Code: ProvisionedThroughputExceededException; Request ID: f861ee0e-0b85-322a-a15e-d25015cb7515; Proxy: null). Message arrived on: topic/load_test, Action: kinesis, Name: load_test_stream, PartitionKey: 84317fbf-e5bc-4448-a70c-b5480d206931" } ] }
エラーアクションに設定したLambdaで、base64OriginalPayloadをデコードすると、Payloadの内容がそのまま入っています。 そこで、それを再び IoT Core へPublishするように変更しました。(IoT Coreへのアクセスポリシーも追加)
import time import base64 import json import boto3 import ast iot = boto3.client('iot-data', region_name='ap-northeast-1') def lambda_handler(event, context): print(event) time.sleep(0.1) # 少し、ウエイトを置いてから、再度処理する payload = base64.b64decode(event["base64OriginalPayload"]).decode("utf-8") payload = ast.literal_eval(payload) topic = 'load_test' iot.publish( topic = topic, qos = 1, payload = json.dumps(payload) )
上記のLambdaをエラーアクションに設定していると、到着はやや乱れますが、総数で、10,800件処置されたことが確認できました。
この時、エラーアクションの方は、310回起動されています。
10,800件送ったPublishの到着は、11,110件となっています。
クライアントIDで区別してみると 当初のクライアントから到着しているのが、10,800件
それ以外(LambdaからのPublish)が、310件です。
5 最後に
今回は、一時的にシャードの制限を超えてしまって溢れたデータを、ルールのエラーアクションで、再び、IoT Coreへ送り直す事で、データのロストをリカバリーする要領を確認してみました。
なお、溢れたデータがリカバリーできても、結局、シャードの制限が増加する訳ではないので、エラーアクションがInvokeされた時点で、シャードの数の見直しは必要だと思います。